LakeSoul CDC Ingestion Table
CDC (Change Data Capture) is an important data source for Lakehouse. The goal of LakeSoul CDC ingestion table is to sync the change of online OLTP database into LakeSoul in a very low latency, usually several minutes, manner so that the downstream analytics could get the newest results as soon as possible without the need of tranditional T+1 database dump.
LakeSoul uses an extra change operation column (column name is configurable) to model the CDC data and can consume the CDC sources including Debezium, canal as well as Flink CDC.
To create a LakeSoul CDC ingestion table, add a table property lakesoul_cdc_change_column
with the column name that records the change type. This column should be of string
type and contains one of the three values: update
, insert
, delete
.
During merge of the read job, only the newest records of type update
and insert
would be kept and the delete
type records would be filtered out automatically.
Create LakeSoul CDC Ingestion Table
Use Scala API or SQL, assuming change operation column name is change_type
:
- Scala
- SQL
import com.dmetasoul.lakesoul.tables.LakeSoulTable
LakeSoulTable.createTable(data, path).shortTableName("cdc_ingestion").hashPartitions("id").hashBucketNum(2).rangePartitions("rangeid").tableProperty("lakesoul_cdc_change_column" -> "change_type").create()
CREATE TABLE table_name (id string, date string, change_type string) USING lakesoul
PARTITIONED BY (date)
LOCATION 's3://lakesoul-bucket/table_path'
TBLPROPERTIES('lakesoul_cdc_change_column'='change_type',
'hashPartitions'='id',
'hashBucketNum'='2');
Note that LakeSoul CDC ingestion table must have primary key(s) and the primary keys(s) should be the same with the online OLTP table.
Incremental Read for LakeSoul CDC Table
The LakeSoul adopts the primary key sharding mode for incremental upsert, so the incremental data does not need to be merged with the stock data when writing. For CDC tables, the delta data is the content of the original CDC stream. The CDC incremental read of the LakeSoul table can fully retain the CDC operation flags, namely insert, update, and delete. The current version 2.2.0 already supports incremental streaming reading in Spark. The next version will release Flink Stream Source, which supports streaming incremental reading as Flink ChangeLog Stream.